package org.luo.lan.server.handlers;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.AppendableCharSequence;
import org.luo.lan.common.WaitWriteTask;
import org.luo.lan.common.util.Pair;
import lombok.extern.slf4j.Slf4j;
import org.luo.lan.common.handler.AbstractHanderController;
import org.luo.lan.common.handler.ControlTypeConstant;
import org.luo.lan.common.handler.Controller;
import org.luo.lan.common.handler.Request;
import org.luo.lan.common.util.StringUtils;
import org.luo.lan.server.config.ServerConfig;
import org.luo.lan.server.config.ServerInfo;
import org.luo.lan.server.constants.AttrConstants;
import org.luo.lan.server.factory.BridgeChannelFactory;
import org.luo.lan.server.factory.SessionFactory;
import org.luo.lan.server.util.HostReplaceInfo;
import org.luo.lan.server.util.HttpHeaderParser;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Auther: luobiao
 * @Date: 2020/9/19 09:15
 * @Description:
 */
@Slf4j
public class ServerBridgeHandler extends SimpleChannelInboundHandler<Request> {
    private static final int MAX_HEART_BEAT_FAIL_TIMES = 3 ; //最大心跳超时失败次数
    private static final int HEART_BEAT_INTERVAL_TIME = 6; //心跳间隔时间
    private AtomicInteger failTimes = new AtomicInteger(0);

    private final String HTTP_HOST_HEADER_PREFIX = "Host:";
    private ThreadLocal<HostReplaceInfo> HOST_INDEX_LENGTH = new ThreadLocal<>();
    private Map<String, BlockingQueue<Runnable>> queueMap = new HashMap<>();
    private ReentrantLock createChannelLock = new ReentrantLock();

    private Controller controller;

    public ServerBridgeHandler(){
        this.controller=new ServerBridgeController();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Request request) {
        controller.control(ctx, request);
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state()== IdleState.READER_IDLE){
                Channel channel = ctx.channel();
                String clientKey=channel.attr(AttrConstants.CLIENT_KEY).get();
                log.debug(HEART_BEAT_INTERVAL_TIME + "秒内没有收到客户端" + clientKey + "的心跳包！");
                if (failTimes.get() >= MAX_HEART_BEAT_FAIL_TIMES) {
                    log.error((HEART_BEAT_INTERVAL_TIME * MAX_HEART_BEAT_FAIL_TIMES) + "秒内没有收到客户端" + clientKey + "的心跳包！，关闭桥接通道");
                    ctx.close();
                } else {
                    failTimes.incrementAndGet();
                    log.debug("累加客户端" + clientKey + "失败次数，当前值失败次数为" + failTimes.get());
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.debug(ctx+"桥接通道销毁中...");
        NioSocketChannel bridgeChannel=(NioSocketChannel) ctx.channel();
        String clientKey = bridgeChannel.attr(AttrConstants.CLIENT_KEY).get();
        //如果是auth的时候未成功则clientKey值还未设置，就不需要进行执行remove操作了
        if (StringUtils.isNotBlank(clientKey)) {
            BridgeChannelFactory.INSTANCE.removeBridgeChannel(clientKey);
            BridgeChannelFactory.INSTANCE.closeTcpTunnelChannel(clientKey);
            SessionFactory.INSTANCE.closeSeesionChannel(bridgeChannel);//只需要关闭相关代理通道即可，sessionKey的移除操作在各自的channelInactive方法中进行
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        if (!(cause instanceof IOException)) {
            cause.printStackTrace();
        }
        log.info(cause.getMessage());
    }

    public class ServerBridgeController extends AbstractHanderController<ChannelHandlerContext> {

        @Override
        public void auth(ChannelHandlerContext ctx, Request request) {
            String clientKey = new String(request.getBody());
            ServerInfo.Client client=ServerConfig.getInstance().getClient(clientKey);
            Request retRequest;
            if (client != null) {
                //检测当前需要连接的客户端是否包含已启动的域名或端口
                synchronized (BridgeChannelFactory.INSTANCE) { //保证判断是否存在和创建是一个原子操作
                    Pair<Boolean,String> checkPair=BridgeChannelFactory.INSTANCE.isExist(clientKey);
                    if (checkPair.getKey()) {
                        log.error(clientKey+" 客户端授权失败，"+checkPair.getValue());
                        retRequest = Request.FAIL(ControlTypeConstant.AUTH, checkPair.getValue());
                    } else {
                        NioSocketChannel channel=(NioSocketChannel) ctx.channel();
                        channel.attr(AttrConstants.CLIENT_KEY).set(clientKey);
                        Pair<Boolean,String> createPair=BridgeChannelFactory.INSTANCE.createBridgeChannel(clientKey, channel);
                        if (createPair.getKey()) {
                            ctx.pipeline().addFirst("idle",new IdleStateHandler(HEART_BEAT_INTERVAL_TIME, 0, 0));
                            retRequest = Request.SUCCESS(ControlTypeConstant.AUTH,"授权成功！");
                            log.debug("客户端{}授权成功" ,request.getStringBody());
                        }else{
                            log.error(clientKey+" 创建桥接通道失败，"+createPair.getValue());
                            retRequest = Request.FAIL(ControlTypeConstant.AUTH, createPair.getValue());
                        }
                    }
                }
            }else{
                log.error(clientKey+" 客户端授权失败，客户端密钥错误！");
                retRequest = Request.FAIL(ControlTypeConstant.AUTH, "客户端秘钥错误！");
            }
            ctx.writeAndFlush(retRequest);
        }

        @Override
        public void transport(ChannelHandlerContext ctx, Request request) {
             String sessionKey = request.getStringHeader();
            NioSocketChannel proxyChannel=SessionFactory.INSTANCE.getSessionChannel(sessionKey);
            if (proxyChannel != null) {
                String response=request.getStringBody();
            log.debug("目标响应请求的channel={}，：",proxyChannel);
/*            log.debug("---------------------------------目标响应内容为begin---------------------------------------");
            log.debug(response);
            log.debug("-----------------------------------目标响应内容为end-------------------------------------");*/
                ByteBuf byteBuf = Unpooled.wrappedBuffer(request.getBody());
                proxyChannel.writeAndFlush(byteBuf);
            }else{
                log.error("会话不存在！sessionKey={}，type={},body={},ctx={}",sessionKey,request.getType(),request.getStringBody(),ctx);
            }
        }

        @Override
        public void heartbeat(ChannelHandlerContext ctx, Request request) {
            String clientKey=ctx.channel().attr(AttrConstants.CLIENT_KEY).get();
            if (failTimes.get() > 0) {
                log.debug("服务器端收客户端"+clientKey +"的心跳包，当前失败次数为"+failTimes.get()+"，现重置失败次数为0！");
            }
            failTimes.set(0);
            ctx.writeAndFlush(Request.SUCCESS(ControlTypeConstant.HEARTBEAT, "PONE"));
        }

        @Override
        public void closeSession(ChannelHandlerContext ctx, Request request) {
            String sessionKey = request.getStringHeader();
            NioSocketChannel proxyChannel=SessionFactory.INSTANCE.getSessionChannel(sessionKey);
            if (proxyChannel != null) {
                log.debug("关闭会话通道，{}",request.getStringBody());
                proxyChannel.attr(AttrConstants.IS_MANUAL_CLOSE).set(true);
                proxyChannel.close();
            }
        }

        @Override
        public void proxy(ChannelHandlerContext ctx, Request request) {
            ByteBuf byteBuf = Unpooled.wrappedBuffer(request.getBody());
            String sessionKey =request.getStringHeader();
            NioSocketChannel targetChannel = SessionFactory.INSTANCE.getSessionChannel(sessionKey);
            if (targetChannel == null) {
                try{
                    if(createChannelLock.tryLock(3, TimeUnit.SECONDS)){
                        targetChannel = SessionFactory.INSTANCE.getSessionChannel(sessionKey);
                        if (targetChannel == null) {
                            if (queueMap.containsKey(sessionKey)) {
                                BlockingQueue<Runnable> queue=queueMap.get(sessionKey);
                                if (queue != null) {
                                    queue.add(new WaitWriteTask(()->{
                                        NioSocketChannel channel = SessionFactory.INSTANCE.getSessionChannel(sessionKey);
                                        channel.writeAndFlush(byteBuf);
                                    }));
                                }
                            }else{
                                boolean isConnect=isConnect(byteBuf);
                                String host_port = getHost(byteBuf);
                                String[] hostPortArr=host_port.trim().split(":");
                                String host = hostPortArr[0];
                                Integer port=80;
                                if (hostPortArr.length == 2) {
                                    port = Integer.valueOf(hostPortArr[1]);
                                }
                                queueMap.put(sessionKey, new LinkedBlockingQueue<>());
                                SessionFactory.INSTANCE.createSession(ctx, host, port, sessionKey, (p -> {
                                    if (p == null) {
                                        log.error("创建代理会话channel失败，sessionKey={}",sessionKey);
                                        ctx.channel().writeAndFlush(Request.SUCCESS(ControlTypeConstant.CLOSE_SESSION, "客户端创建代理会话通道失败，请求关闭服务器会话通道！").addHeader(sessionKey));
                                        queueMap.remove(sessionKey);
                                    }else{
                                        p.attr(AttrConstants.SEESION_KEY).set(sessionKey);
                                        p.attr(AttrConstants.BRIDGE_CHANNEL).set((NioSocketChannel)ctx.channel());
                                        p.attr(AttrConstants.IS_MANUAL_CLOSE).set(false);
                                        if (isConnect) {
                                            log.debug("----------------"+host+" connect 请求，直接回复 HTTP/1.1 200 Connection Established Connection: close-------------------");
                                            byte[] establishedBytes=new byte[]{72,84,84,80,47,49,46,49,32,50,48,48,32,67,111,110,110,101,99,116,105,111,110,32,69,115,116,97,98,108,105,115,104,101,100,13,10,67,111,110,110,101,99,116,105,111,110,58,32,99,108,111,115,101,13,10,13,10};
                                            ctx.channel().writeAndFlush(Request.SUCCESS(ControlTypeConstant.PROXY,establishedBytes).addHeader(sessionKey));
                                        }else{
                                            p.writeAndFlush(byteBuf);
                                        }
                                        BlockingQueue<Runnable> queue=queueMap.get(sessionKey);
                                        while (queue.size() > 0) {
                                            try {
                                                queue.take().run();
                                            } catch (InterruptedException e) {
                                                log.error("出队线程被中断{}"+e);
                                            }
                                        }
                                        queueMap.remove(sessionKey);
                                    }
                                }));
                            }
                        }else{
                            targetChannel.writeAndFlush(byteBuf);
                        }
                    }else{
                        log.error("获取锁超时！");
                    }
                } catch (InterruptedException e) {
                    log.error("线程被中断！e={}",e);
                    queueMap.remove(sessionKey);
                } finally {
                    createChannelLock.unlock();
                }
                return;
            }
            targetChannel.writeAndFlush(byteBuf);
        }

    }


    private boolean isConnect(ByteBuf byteBuf) {
        ByteBuf sliceByteBuf=byteBuf.slice(0, 7);
        byte[] bytes = new byte[sliceByteBuf.readableBytes()];
        sliceByteBuf.readBytes(bytes);
        if ("CONNECT".equals(new String(bytes))) {
            return true;
        }
        return false;
    }

    private String getHost(ByteBuf byteBuf) {
        ByteBuf sliceByteBuf=byteBuf.slice(0, byteBuf.readableBytes());
        HOST_INDEX_LENGTH.set(null);
        AppendableCharSequence seq = new AppendableCharSequence(128);
        HttpHeaderParser headerParser = new HttpHeaderParser(seq, 8192);
        AppendableCharSequence line = headerParser.parse(sliceByteBuf);
        if (line == null) {
            return null;
        }
        String host = "";
        do {
            line = headerParser.parse(sliceByteBuf);
            if (line == null) {
                break;
            }
            if (line.length() > HTTP_HOST_HEADER_PREFIX.length() && line.substring(0, HTTP_HOST_HEADER_PREFIX.length()).equalsIgnoreCase(HTTP_HOST_HEADER_PREFIX)) {
                String host_port = line.substring(5, line.length());
                host = host_port;
                break;
            }
        } while (line.length() > 0);
        return host;
    }
}
